home *** CD-ROM | disk | FTP | other *** search
/ AmigActive 23 / AACD 23.iso / AACD / Programming / tek / kn / sockcommon / waitclientsock.c < prev    next >
Encoding:
C/C++ Source or Header  |  2001-05-12  |  11.2 KB  |  482 lines

  1.  
  2. static int clientprocwrite(struct knclientsocket *s, struct knclinode *cnode);
  3. static int clientprocread(struct knclientsocket *s);
  4.  
  5. /*
  6. **
  7. **    newmsg = kn_waitclientsock(sock, event)
  8. **
  9. **    process messages from socket and wait for an event to occur.
  10. **    returns the number of messages processed.
  11. **
  12. **    TODO: only abandon single messages on individual timeouts,
  13. **    not the entire socket!
  14. */
  15.  
  16. TUINT kn_waitclientsock(TAPTR knsock, TKNOB *event)
  17. {
  18.     struct knclientsocket *s = (struct knclientsocket *) knsock;
  19.     kn_sockenv_t *sockenv = &s->sockenv;
  20.  
  21.     int numready;
  22.     TUINT numprocessed = 0;
  23.     TBOOL signal_occured;
  24.  
  25.     if (s->status == SOCKSTATUS_BROKEN)
  26.     {
  27.         struct timeval waittimeout = {0, TIMEOUT_USEC};
  28.  
  29.         /*
  30.         **    spool out pending replies
  31.         */
  32.         
  33.         struct knclinode *cnode;
  34.         TUINT deliver = 0;
  35.         
  36.         while ((cnode = (struct knclinode *) TRemHead(&s->readlist)))
  37.         {
  38.             cnode->msg->status = TMSG_STATUS_FAILED;
  39.             TAddTail(&s->deliverlist, (TNODE *) cnode);
  40.             deliver++;
  41.         }
  42.  
  43.         while ((cnode = (struct knclinode *) TRemHead(&s->writelist)))
  44.         {
  45.             cnode->msg->status = TMSG_STATUS_FAILED;
  46.             TAddTail(&s->deliverlist, (TNODE *) cnode);
  47.             deliver++;
  48.         }
  49.         
  50.         if (deliver)
  51.         {
  52.             dbsprintf(2, "*** TEKLIB kn_waitclientsock: spooling out pending replies from broken socket\n");
  53.             return deliver;
  54.         }
  55.  
  56.         dbsprintf(2, "*** TEKLIB kn_waitclientsock: waiting for event on broken socket\n");
  57.         kn_timedwaitevent(event, s->timer, (TTIME *) &waittimeout);
  58.         return 0;
  59.     }
  60.  
  61.     if (!TListEmpty(&s->deliverlist))
  62.     {
  63.         return 0;
  64.     }
  65.  
  66.     do
  67.     {
  68.         int do_select = 0;
  69.         if (TListEmpty(&s->readlist))
  70.         {
  71.             FD_CLR(s->desc, &s->readset);
  72.         }
  73.         else
  74.         {
  75.             FD_SET(s->desc, &s->readset);
  76.             do_select = 1;
  77.         }
  78.  
  79.         if (TListEmpty(&s->writelist))
  80.         {
  81.             FD_CLR(s->desc, &s->writeset);
  82.         }
  83.         else
  84.         {
  85.             FD_SET(s->desc, &s->writeset);
  86.             do_select = 1;
  87.         }
  88.  
  89.         if (do_select)
  90.         {
  91.             struct timeval waittimeout = {0, TIMEOUT_USEC};
  92.  
  93.             dbsprintf(2, "*** kn_waitclientsock: select\n");
  94.             numready = kn_waitselect(sockenv, FD_SETSIZE, &s->readset, &s->writeset, NULL, &waittimeout, event, &signal_occured);
  95.             if (numready > 0)
  96.             {
  97.                 if (FD_ISSET(s->desc, &s->readset))
  98.                 {
  99.                     numprocessed += clientprocread(s);
  100.                 }
  101.                 else if (FD_ISSET(s->desc, &s->writeset))
  102.                 {
  103.                     struct knclinode *cnode;
  104.                     while ((cnode = (struct knclinode *) TFirstNode(&s->writelist)))
  105.                     {
  106.                         if (!clientprocwrite(s, cnode))
  107.                         {
  108.                             break;
  109.                         }
  110.                         numprocessed++;
  111.                     }
  112.                 }
  113.                 else dbsprintf(20, "*** kn_waitclientsock: unknown descriptor\n");
  114.             }
  115.             else if (numready < 0)
  116.             {
  117.                 dbsprintf(10, "*** kn_waitclientsocket: select()\n");
  118.             }
  119.         }
  120.         else
  121.         {
  122.             TTIME tektime = {0, TIMEOUT_USEC};
  123.         
  124.             signal_occured = kn_timedwaitevent(event, s->timer, &tektime);
  125.  
  126.             dbsprintf(2, "*** kn_waitclientsock: timedwaitevent\n");
  127.         }
  128.  
  129.         /*if (!numprocessed)*/
  130.         {
  131.             /*    handle timeout */
  132.  
  133.             TNODE *nextnode, *node = s->readlist.head;
  134.             struct knclinode *cnode;
  135.             TTIME now;
  136.             TFLOAT nowf;
  137.  
  138.             kn_querytimer(s->timer, &now);
  139.             nowf = TTIMETOF(&now);
  140.  
  141.             dbsprintf(3, "*** TEKLIB kn_waitclientsock: checking timeouts on readlist\n");
  142.             while ((nextnode = node->succ))
  143.             {
  144.                 cnode = (struct knclinode *) node;
  145.                 if (nowf - cnode->timestamp > s->msgtimeout)
  146.                 {
  147.                     /*
  148.                     **    TODO: only abandon THIS message, not the entire socket!
  149.                     */
  150.                 
  151.                     dbsprintf(5, "*** TEKLIB kn_waitclientsock: message timeout!\n");
  152.  
  153.                     s->status = SOCKSTATUS_BROKEN;
  154.                     numprocessed = 1;
  155.                     break;
  156.                 }
  157.                 node = nextnode;
  158.             }
  159.         }
  160.  
  161.     } while (!numprocessed && !signal_occured);
  162.  
  163.     return numprocessed;
  164. }
  165.  
  166.  
  167.  
  168. /*
  169. **    processed = clientprocwrite(knclientsock, knclinode)
  170. **
  171. **    write pending data from a client connection node.
  172. */
  173.  
  174. static int clientprocwrite(struct knclientsocket *s, struct knclinode *cnode)
  175. {
  176.     kn_sockenv_t *sockenv = &s->sockenv;
  177.     int numwritten;
  178.  
  179.     if (cnode->bytesdone < sizeof(knnethead))
  180.     {
  181.         kn_locksock(sockenv);
  182.     
  183.         numwritten = send(s->desc,
  184.             ((char *) &cnode->nethead) + cnode->bytesdone,
  185.             sizeof(knnethead) - cnode->bytesdone, KNSOCK_SENDFLAGS);
  186.         
  187.         kn_unlocksock(sockenv);
  188.         
  189.         if (numwritten == -1)
  190.         {
  191.             if (kn_getsockerrno(sockenv, s->desc) != EWOULDBLOCK)
  192.             {
  193.                 dbsprintf(10, "*** TEKLIB: clientprocwrite: dropping connection due to unexpected error\n");
  194.                 s->status = SOCKSTATUS_BROKEN;
  195.                 TRemove((TNODE *) cnode);
  196.                 if (cnode->msg->replyport)
  197.                 {
  198.                     cnode->msg->status = TMSG_STATUS_FAILED;
  199.                     TAddTail(&s->deliverlist, (TNODE *) cnode);
  200.                 }
  201.                 else
  202.                 {
  203.                     TFreeMsg(cnode->msg + 1);
  204.                     TAddTail(&s->freelist, (TNODE *) cnode);
  205.                 }
  206.                 return 1;
  207.             }
  208.             dbsprintf(5, "*** TEKLIB clientprocwrite: send(1) would block\n");
  209.             return 0;            /* msg header not yet complete, but no more data pending */
  210.         }
  211.  
  212.         cnode->bytesdone += numwritten;
  213.         
  214.         if (cnode->bytesdone == sizeof(knnethead))
  215.         {
  216.             if (cnode->bytesdone == cnode->bytestowrite)
  217.             {
  218.                 /* msg complete */
  219.  
  220.                 TRemove((TNODE *) cnode);
  221.                 if (cnode->msg->replyport)
  222.                 {
  223.                     dbsprintf(3, "*** TEKLIB clientprocwrite: two-way message delivered\n");
  224.                     cnode->msg->status = TMSG_STATUS_SENT;
  225.                     TAddTail(&s->readlist, (TNODE *) cnode);
  226.                 }
  227.                 else
  228.                 {
  229.                     dbsprintf(3, "*** TEKLIB clientprocwrite: one-way messages processed\n");
  230.  
  231.                     TFreeMsg(cnode->msg + 1);
  232.                     TAddTail(&s->freelist, (TNODE *) cnode);
  233.                 }
  234.                 return 1;
  235.             }
  236.         }
  237.         else
  238.         {
  239.             return 0;            /* msg header not yet complete, but no more data pending */
  240.         }
  241.     }
  242.     
  243.  
  244.     /*    write msg body */
  245.  
  246.     kn_locksock(sockenv);
  247.     
  248.     numwritten = send(s->desc,
  249.         ((char *) cnode->msg) + cnode->bytesdone - sizeof(knnethead) + sizeof(TMSG),
  250.         cnode->bytestowrite - cnode->bytesdone, KNSOCK_SENDFLAGS);
  251.  
  252.     kn_unlocksock(sockenv);
  253.     
  254.     if (numwritten == -1)
  255.     {
  256.         if (kn_getsockerrno(sockenv, s->desc) != EWOULDBLOCK)
  257.         {
  258.             dbsprintf(10, "*** TEKLIB: clientprocwrite: dropping connection due to unexpected error\n");
  259.  
  260.             s->status = SOCKSTATUS_BROKEN;
  261.             TRemove((TNODE *) cnode);
  262.             if (cnode->msg->replyport)
  263.             {
  264.                 cnode->msg->status = TMSG_STATUS_FAILED;
  265.                 TAddTail(&s->deliverlist, (TNODE *) cnode);
  266.             }
  267.             else
  268.             {
  269.                 TFreeMsg(cnode->msg + 1);
  270.                 TAddTail(&s->freelist, (TNODE *) cnode);
  271.             }
  272.             return 1;
  273.         }
  274.         dbsprintf(5, "*** TEKLIB clientprocwrite: send(2) would block\n");
  275.         return 0;
  276.     }
  277.     
  278.     cnode->bytesdone += numwritten;
  279.  
  280.     if (cnode->bytesdone == cnode->bytestowrite)
  281.     {
  282.         /* msg complete */
  283.  
  284.         TRemove((TNODE *) cnode);
  285.         if (cnode->msg->replyport)
  286.         {
  287.             dbsprintf(3, "*** TEKLIB clientprocwrite: two-way message delivered(2)\n");
  288.             cnode->msg->status = TMSG_STATUS_SENT;
  289.             TAddTail(&s->readlist, (TNODE *) cnode);
  290.         }
  291.         else
  292.         {
  293.             dbsprintf(3, "*** TEKLIB clientprocwrite: one-way message processed(2)\n");
  294.  
  295.             TFreeMsg(cnode->msg + 1);
  296.             cnode->msg = TNULL;
  297.             TAddTail(&s->freelist, (TNODE *) cnode);
  298.         }
  299.         return 1;
  300.     }
  301.  
  302.     return 0;
  303. }
  304.  
  305.  
  306.  
  307. /*
  308. **    processed = clientprocread(knclientsock)
  309. **
  310. **    read pending data, and insert it to a client connection node.
  311. */
  312.  
  313. static int clientprocread(struct knclientsocket *s)
  314. {
  315.     kn_sockenv_t *sockenv = &s->sockenv;
  316.  
  317.     int numread;
  318.     TUINT size, proto, msgID, version;
  319.     TNODE *node, *nextnode;
  320.  
  321.     if (s->bytesdone < sizeof(knnethead))
  322.     {
  323.         /*    read msg header */
  324.  
  325.         kn_locksock(sockenv);
  326.  
  327.         numread = recv(s->desc, ((char *) &s->nethead) + s->bytesdone, sizeof(knnethead) - s->bytesdone, KNSOCK_RECVFLAGS);
  328.  
  329.         kn_unlocksock(sockenv);
  330.  
  331.         if (numread <= 0)
  332.         {
  333.             int sockerr = kn_getsockerrno(sockenv, s->desc);
  334.             
  335.             if (numread == 0 || sockerr != EWOULDBLOCK)
  336.             {
  337.                 if (numread == 0)
  338.                 {
  339.                     dbsprintf(5, "*** TEKLIB clientprocread: end-of-file on descriptor - dropping connection\n");
  340.                 }
  341.                 else if (sockerr != EWOULDBLOCK)
  342.                 {
  343.                     dbsprintf(20, "*** TEKLIB clientprocread: unexpected error on descriptor - dropping connection\n");
  344.                 }
  345.                 
  346.                 s->status = SOCKSTATUS_BROKEN;
  347.                 s->bytesdone = 0;
  348.                 return 1;
  349.             }
  350.             else dbsprintf(3, "*** clientprocread: recv(1) would block\n");
  351.             return 0;
  352.         }
  353.         
  354.         s->bytesdone += numread;
  355.         
  356.         if (s->bytesdone != sizeof(knnethead))
  357.         {
  358.             return 0;
  359.         }
  360.         
  361.         proto = s->nethead.protocol;
  362.         version = s->nethead.version;
  363.  
  364.         if (version == KNSOCK_VERSION && (proto == KNSOCK_PROTO_REPLY || proto == KNSOCK_PROTO_ACK))
  365.         {
  366.             msgID = s->nethead.msgID = ntohl(s->nethead.msgID);
  367.             
  368.             /*    find matching message to this reply from network */
  369.             
  370.             node = s->readlist.head;
  371.             while ((nextnode = node->succ))
  372.             {
  373.                 if (((struct knclinode *) node)->msgID == msgID)
  374.                 {
  375.                     struct knclinode *cnode = (struct knclinode *) node;
  376.                 
  377.                     size = s->nethead.msgsize = ntohl(s->nethead.msgsize);
  378.                     
  379.                     if (size == sizeof(knnethead) && proto == KNSOCK_PROTO_ACK)
  380.                     {
  381.                         /* ackmsg complete - link to deliver list */
  382.                 
  383.                         dbsprintf(3, "*** TEKLIB clientprocread: delivering complete ack\n");
  384.                         TRemove(node);
  385.                         cnode->msg->status = TMSG_STATUS_ACKD;
  386.                         TAddTail(&s->deliverlist, node);
  387.                         s->clientnode = TNULL;
  388.                         s->bytesdone = 0;
  389.                         return 1;
  390.                     }
  391.  
  392.                     if (proto == KNSOCK_PROTO_REPLY && size == cnode->msg->size - sizeof(TMSG) + sizeof(knnethead))
  393.                     {
  394.                         dbsprintf(3, "*** TEKLIB clientprocread: identified proper reply\n");
  395.                         s->clientnode = cnode;
  396.                         break;
  397.                     }
  398.  
  399.                     dbsprintf(20, "*** TEKLIB clientprocread: illegal message size for correct msgID - dropping\n");
  400.  
  401.                     s->status = SOCKSTATUS_BROKEN;
  402.                     TRemove(node);
  403.                     cnode->msg->status = TMSG_STATUS_FAILED;
  404.                     TAddTail(&s->deliverlist, node);
  405.                     s->bytesdone = 0;
  406.                     return 1;
  407.                 }
  408.                 node = nextnode;
  409.             }
  410.         }
  411.         
  412.         if (!s->clientnode)
  413.         {
  414.             dbsprintf(20, "*** TEKLIB clientprocread: illegal message - dropping connection\n");
  415.  
  416.             s->status = SOCKSTATUS_BROKEN;
  417.             s->bytesdone = 0;
  418.             return 1;
  419.         }
  420.     }
  421.  
  422.     if (s->bytesdone < s->clientnode->msg->size - sizeof(TMSG) + sizeof(knnethead))
  423.     {
  424.         /*    read msg body */
  425.  
  426.         kn_locksock(sockenv);
  427.     
  428.         numread = recv(s->desc,
  429.             ((char *) s->clientnode->msg) + s->bytesdone - sizeof(knnethead) + sizeof(TMSG),
  430.             s->clientnode->msg->size - sizeof(TMSG) + sizeof(knnethead) - s->bytesdone, KNSOCK_RECVFLAGS);
  431.  
  432.         kn_unlocksock(sockenv);
  433.  
  434.         if (numread <= 0)
  435.         {
  436.             int sockerr = kn_getsockerrno(sockenv, s->desc);
  437.             if (numread == 0 || sockerr != EWOULDBLOCK)
  438.             {
  439.                 if (numread == 0)
  440.                 {
  441.                     dbsprintf(5, "*** TEKLIB clientprocread(2): end-of-file on descriptor - dropping connection\n");
  442.                 }
  443.                 else if (sockerr != EWOULDBLOCK)
  444.                 {
  445.                     dbsprintf(20, "*** TEKLIB clientprocread(2): unexpected error on descriptor - dropping connection\n");
  446.                 }
  447.  
  448.                 s->status = SOCKSTATUS_BROKEN;
  449.                 TRemove((TNODE *) s->clientnode);
  450.                 s->clientnode->msg->status = TMSG_STATUS_FAILED;
  451.                 TAddTail(&s->deliverlist, (TNODE *) s->clientnode);
  452.                 s->clientnode = TNULL;
  453.                 s->bytesdone = 0;
  454.                 return 1;
  455.             }
  456.             else dbsprintf(3, "*** clientprocread: recv(2) would block\n");
  457.  
  458.             return 0;    /* msg header not yet complete, but no more data pending */
  459.         }
  460.         
  461.         s->bytesdone += numread;
  462.     }
  463.  
  464.     if (s->bytesdone == s->clientnode->msg->size - sizeof(TMSG) + sizeof(knnethead))
  465.     {
  466.         /* replymsg complete - link to deliver list */
  467.  
  468.         dbsprintf(3, "*** TEKLIB clientprocread: delivering complete reply\n");
  469.         TRemove((TNODE *) s->clientnode);
  470.  
  471.         s->clientnode->msg->status = TMSG_STATUS_REPLIED;
  472.             /*s->clientnode->msg->status = s->nethead.protocol == TMSG_STATUS_REPLIED;         ???? */
  473.         
  474.         TAddTail(&s->deliverlist, (TNODE *) s->clientnode);
  475.         s->clientnode = TNULL;
  476.         s->bytesdone = 0;
  477.         return 1;
  478.     }
  479.  
  480.     return 0;
  481. }
  482.